Amazon Managed Service for Apache Flink で異常検知(Random Cut Forest)をやってみた

Amazon Managed Service for Apache Flink で異常検知(Random Cut Forest)をやってみた

Clock Icon2024.10.21

はじめに

コンサルティング部の神野です。

Kinesis Analyticsでは異常検知を行うためのRandom Cut Forestの組み込み関数が用意されていますが、
Amazon Managed Service for Apache Flinkでは同様の機能が用意されていません。そのため、Amazon Managed Service for Apache Flinkで独自にRandom Cut Forestを実装する必要があります。

どのように対応すればよいか調べているとAWSの公式ブログでやり方が紹介されていました。
Random Cut Forest用のモジュールをAWS公式が提供しているため、そちらを下記記事のようにJavaで実装すれば実現できるみたいです。

Amazon Managed Service for Apache Flink のランダムカットフォレストによるリアルタイム異常検出

ただ、CFnリンクでサンプルが構築されるもののどういった作りか不明瞭なので、
少しアレンジおよびコンソールからリソースを作成して進めていきたいと思います。

Random Cut Forestについて

Random Cut Forest(RCF)は、ストリーミングデータの異常検知に適した教師なし学習アルゴリズムの1つです。

RCFの特徴

  1. 適応性: データの分布が時間とともに変化しても、モデルが自動的に適応
  2. スケーラビリティ: 大規模なデータストリームでも効率的に動作
  3. 解釈可能性: 異常スコアを提供し、どの程度異常かを数値化
  4. リアルタイム処理: ストリーミングデータに対してリアルタイムで異常を検出

今回はこの学習アルゴリズムを活用して、異常検知を行います。

構築するシステム構成図

Amazon Managed Service for Apache Flinkでストリーミングデータのリアルタイム分析を実施します。
異常があった場合はSNSで通知し、異常検知データ用のStreamへ出力する構成とします。
CleanShot 2024-10-19 at 11.48.19@2x

AWSの準備

先にAWSの利用するリソースを作成します。

  • Kinesis Stream
    • Input用のStream
    • 異常検知したデータが出力されるOutput用のStream
  • SNS
    • 異常検知時に送付するトピック
  • S3
    • Flinkアプリケーションのjarファイル格納用バケット

Kinesis Stream

Input用とOutput用にそれぞれ作成します。

  1. Kinesisの画面でデータストリームタブを選択し、データストリームの作成ボタンを押下
    CleanShot 2024-10-20 at 00.57.24@2x
  2. データストリーム名はIn-RCFData-StreamOut-RCFData-Streamをそれぞれ入力し、今回は検証用のためシャード数1で容量はプロビジョンドを選択してデータストリームの作成ボタンを押下
    CleanShot 2024-10-20 at 00.58.12@2x

SNS

  1. SNSの画面に遷移し、トピックタブを選択してトピックの作成ボタンを押下
    CleanShot 2024-10-20 at 01.01.22@2x

  2. タイプはスタンダードを選択し、名前はanomaly_detectionを入力してトピックの作成ボタンを押下
    CleanShot 2024-10-20 at 01.02.09@2x

  3. 作成完了後は配信先のサブスクリプションを作成します。

    CleanShot 2024-10-20 at 01.02.41@2x

  4. プロトコルはEメールを選択し、エンドポイントは異常検知を知らせたいメールアドレスを指定(今回は自分のメールアドレス)してサブスクリプションの作成ボタンを押下
    CleanShot 2024-10-20 at 01.04.00@2x

  5. 作成後は指定したメールアドレスにサブスクリプションの確認メールが届くので、Confirm Subscription リンクを押下
    CleanShot 2024-10-20 at 01.05.19@2x

  6. リンクを押下後、下記画面が表示されればSubscriptionへの紐付けが完了
    CleanShot 2024-10-20 at 01.05.49@2x

S3

Random Cut Forest用のjarファイルを格納するためのバケットを作成します。

  1. S3の画面へ遷移しバケットタブからバケットを作成ボタンを押下
    CleanShot 2024-10-20 at 01.07.38@2x
  2. バケットタイプは汎用、バケット名はrcf-jar-repository20241018を入力してそれ以外はデフォルト値から変更せず作成
    CleanShot 2024-10-20 at 01.08.45@2x

IAM

Apache Flink実行用のIAMロールおよびポリシーを作成します。
付与する権限としては下記となります。

  • S3バケットへのアクセス権
  • Kinesis Streamへのアクセス権
  • SNSへのアクセス権
  • CloudWatch Logsへのアクセス権

ポリシー

  1. 下記IAMポリシーをコピーしてコンソールから作成する。YOUR_ACCOUNT_IDは自分のアカウントIDを入力

    作成するIAMポリシー
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "kinesis:DescribeStream",
                    "kinesis:GetShardIterator",
                    "kinesis:GetRecords",
                    "kinesis:ListShards",
                    "kinesis:PutRecord",
                    "kinesis:PutRecords"
                ],
                "Resource": [
                    "arn:aws:kinesis:ap-northeast-1:YOUR_ACCOUNT_ID:stream/In-RCFData-Stream",
                    "arn:aws:kinesis:ap-northeast-1:YOUR_ACCOUNT_ID:stream/Out-RCFData-Stream"
                ]
            },
            {
                "Effect": "Allow",
                "Action": [
                    "s3:GetObject",
                    "s3:ListBucket"
                ],
                "Resource": [
                    "arn:aws:s3:::rcf-jar-repository-20241018",
                    "arn:aws:s3:::rcf-jar-repository-20241018/*"
                ]
            },
            {
                "Effect": "Allow",
                "Action": [
                    "sns:Publish"
                ],
                "Resource": "arn:aws:sns:ap-northeast-1:YOUR_ACCOUNT_ID:anomaly_detection"
            },
          	{
                "Effect": "Allow",
                "Action": [
                    "logs:CreateLogGroup",
                    "logs:CreateLogStream",
                    "logs:PutLogEvents"
                ],
                "Resource": "arn:aws:logs:ap-northeast-1:YOUR_ACCOUNT_ID:*"
            }
        ]
    }
    

    CleanShot 2024-10-19 at 00.14.41@2x

  2. ポリシー名ApacheFlinkRCFCustomPolicyを入れてポリシーの作成ボタンを押下
    CleanShot 2024-10-19 at 00.17.08@2x

ロール

  1. IAMの画面へ遷移し、ロールを作成ボタンを押下
    CleanShot 2024-10-19 at 00.10.23@2x

  2. AWSのサービスを選択し、サービスまたはユースケースにKinesis を選択、更にKinesis Analyticsを選択
    CleanShot 2024-10-19 at 00.12.08@2x

  3. 先ほど作成したポリシーApacheFlinkRCFCustomPolicyを選択して次へ
    CleanShot 2024-10-19 at 00.19.05@2x

  4. ロール名ApacheFlinkRCFExecutionRoleをとつけてロールを作成ボタンを押下

    CleanShot 2024-10-19 at 00.21.17@2x

以上で準備完了です!次はコードをビルドしていきます。

サンプルのリポジトリ

サンプルリポジトリがAWS公式で提供されているため、cloneしてきます。

今回使用するサンプルリポジトリ

amazon-kinesis-data-analytics-examples

実行コマンド
git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git

cloneすると色々なサンプルがありますが、今回はAnomalyDetection/RandomCutForest配下のモジュールを使用します。

前提

今回はJavaのIDEとしてIntelliJ、またJava SDKはAmazon Corettoを使用します。

  • IDE・・・IntelliJ IDEA 2024.2.3 (Community Edition)
  • Java・・・Amazon Corretto 22.0.2 - aarch64

実装

クローンしたフォルダーをIntelliJで開きます。
<clone先のパス>/amazon-kinesis-data-analytics-examples/AnomalyDetection/RandomCutForest/

CleanShot 2024-10-20 at 01.11.07@2x

pom.xml

開いた後はpom.xmlにAWS SDK(SNS)の依存関係を追加します。

pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 	・・・
+	<dependencyManagement>
+          <dependencies>
+              <dependency>
+                  <groupId>software.amazon.awssdk</groupId>
+                  <artifactId>bom</artifactId>
+                  <version>${aws.java.sdk.version}</version>
+                  <type>pom</type>
+                  <scope>import</scope>
+              </dependency>
+          </dependencies>
+  </dependencyManagement>

	<dependencies>
+          <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>sns</artifactId>
+          </dependency>
    ・・・
	</dependencies>
</project>

FlinkStreamingJob.java

このソースコードに下記コードを追加します。追加した具体的な内容としては下記となります。

  • それぞれの温度を異常かどうかのscoreを算出し、異常値はscoreが高くなるため、閾値3.0を超えたら異常なデータとみなす
  • 異常値を検出した場合はSNSトピックへメッセージを配信
  • 異常値はOutput用のStreamへ配信

送信された温度が異常かそうでないか判断して、異常値の場合は通知&別Streamへ配信するといった流れになります。

FlinkStreamingJob.java
//・・・省略
+ import software.amazon.awssdk.services.sns.SnsClient;
+ import software.amazon.awssdk.services.sns.model.PublishRequest;
+ import software.amazon.awssdk.regions.Region;

public class FlinkStreamingJob {
  	// 閾値
+   private static final double ANOMALY_THRESHOLD = 3.0;

	// 追加
  	// SNS配信用の関数
+	private static void sendSnsNotification(OutputData data, String region, String snsTopicArn) {
+       SnsClient snsClient = SnsClient.builder()
+               .region(Region.of(region))
+               .build();
+
+       String message = String.format("Anomaly detected: Time=%s, Value=%f, Score=%f",
+               data.getTime(), data.getValue(), data.getScore());
+
+       PublishRequest publishRequest = PublishRequest.builder()
+               .topicArn(snsTopicArn)
+               .message(message)
+               .build();
+
+       snsClient.publish(publishRequest);
+    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

-		Properties applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties().get("RcfExampleEnvironment");
-       String region = applicationProperties.getProperty("region", "us-west-2");
-       String inputStreamName = applicationProperties.getProperty("inputStreamName", "ExampleInputStream-RCF");
-       String outputStreamName = applicationProperties.getProperty("outputStreamName", "ExampleOutputStream-RCF");
+       Properties applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties().get("RcfSampleApplicationEnvironment");
+       String inputStreamName = applicationProperties.getProperty("inputStreamName", "");
+       String outputStreamName = applicationProperties.getProperty("outputStreamName", "");
+       String snsTopicArn = applicationProperties.getProperty("snsTopicArn", "");

        // 省略

        // 異常値のフィルタリングとSNS通知処理を追加
+        DataStream<OutputData> anomalyStream = processedStream
+                .filter(data -> data.getScore() >= ANOMALY_THRESHOLD)
+                .process(new ProcessFunction<OutputData, OutputData>() {
+                    @Override
+                    public void processElement(OutputData data, Context context, Collector<OutputData> collector) throws Exception {
+                        sendSnsNotification(data, region);
+                        collector.collect(data);
+                    }
+                });

-		source
-                .process(randomCutForestOperator, TypeInformation.of(OutputData.class)).setParallelism(1)
-                .sinkTo(createSink(outputStreamName, region));
+        // 異常値のみを出力Streamに送信
+        anomalyStream.sinkTo(createSink(outputStreamName, region));

        env.execute();
    }
}

これで一通り完了したので.jar ファイルを作成します。

jarファイル作成

  1. ファイルタブから「プロジェクト構造...」を選択
    CleanShot 2024-10-20 at 01.11.37@2x
  2. アーティファクトタブから+>JAR>依存関係を持つモジュールから...を選択
    CleanShot 2024-10-20 at 01.12.26@2x
  3. メインクラスsoftware.amazon.flink.example.FlinkStreamingJobを指定
    CleanShot 2024-10-20 at 01.13.23@2x
  4. OKボタンを押下
    CleanShot 2024-10-20 at 01.13.56@2x
  5. アーティファクトのビルドを実施
    CleanShot 2024-10-20 at 01.14.41@2x
  6. ビルドが完了したら該当のjarファイルが生成されているか確認(flink-random-cut-forest-example.jar
    CleanShot 2024-10-20 at 01.16.02@2x

jarファイル格納

作成したjarファイルを事前にS3バケットrcf-jar-repository-20241018に格納します。

CleanShot 2024-10-20 at 01.18.24@2x

ここではApache Flink アプリケーションを作成していきます。

  1. Managed Apache Flinkの画面へ遷移しApache Flink アプリケーションタブからストリーミングアプリケーションを作成を押下
    CleanShot 2024-10-19 at 00.00.49@2x

  2. 下記設定を入力

    • セットアップ方法:一から作成
    • Apache Flink バージョン:Apache Flink 1.15
    • アプリケーション名:RCFSampleApplication
    • サービスロール:ApacheFlinkRCFExecutionRole
    • テンプレート:開発

    CleanShot 2024-10-19 at 00.23.47@2x

  3. 設定ボタンを押下
    CleanShot 2024-10-21 at 19.59.34@2x

  4. 下記を設定

    • Amazon S3バケット:s3://rcf-jar-repository-20241018
    • S3 オブジェクトへのパス:flink-random-cut-forest-example.jar
    • ランタイムプロバティ(グループID:RCFSampleApplcationEnvironment
      • inputStreamName:In-RCFData-Stream
      • outputStreamName:Out-RCFData-Stream
      • region:ap-northeast-1
      • snsTopicArn:事前に作成したSNSトピックのARN

    CleanShot 2024-10-20 at 01.20.45@2x

  5. 実行ボタンを押下
    CleanShot 2024-10-19 at 10.56.10@2x

  6. スナップショットなしで実行を選択して、アプリケーションを実行
    CleanShot 2024-10-19 at 10.56.39@2x

  7. 下記完了メッセージが表示されるまで待ちます。
    CleanShot 2024-10-19 at 10.59.18@2x

作成が完了したら動作確認していきます!

動作確認

CloudShellでPythonスクリプトを作成して実行します。
スクリプトの概要としては平均30度でIn-RCFData-Streamに平均30度の温度データ&1%の確率で外れ値(15度から45度)を送りRCFの検証に活用します。

send_temp.py
import json
import boto3
import random

# InputSteam名
STREAM_NAME = "In-RCFData-Stream"

def get_data(time):
    # 基準温度(平均30度)
    base_temp = 30.0

    # 通常の変動(-2度から+2度)
    normal_variation = random.uniform(-2, 2)

    # 1%の確率で異常値を生成
    if random.random() < 0.01:
        # 異常値は基準温度から大きく外れた値(例:15度から45度)
        temperature = random.uniform(15, 45)
    else:
        temperature = base_temp + normal_variation

    return {'time': time, 'value': round(temperature, 2)}

def generate(stream_name, kinesis_client):
    time = 0

    while True:
        data = get_data(time)
        kinesis_client.put_record(
            StreamName=stream_name,
            Data=json.dumps(data),
            PartitionKey="partitionkey")

        print(f"Sent data: {data}")  # デバッグ用に送信データを表示
        time += 1

if __name__ == '__main__':
    generate(STREAM_NAME, boto3.client('kinesis', region_name='ap-northeast-1'))

下記コマンドを実行して、上記Pythonスクリプトをコピーしてpythonファイルを作成します。

実行コマンド
nano send_temp.py

CleanShot 2024-10-19 at 11.02.29@2x
CleanShot 2024-10-19 at 11.02.54@2x

ファイルが作成し終えたら、スクリプトを実行します。
実行すると送信ログが出力されます。

実行コマンド
python3 send_temp.py

CleanShot 2024-10-19 at 11.03.38@2x

それぞれのStreamにデータが送信されているか確認します。

In-RCFData-Stream

Input用のStreamとなります。
適切に配信されていますね。

CleanShot 2024-10-21 at 19.49.04@2x

Out-RCFData-Stream

Output用のStreamとなります。
scoreに異常検知のスコアが出力されます。今回はscoreが3以上のデータを出力されています。
下記画像で言うと平均が30度なので42.19度は異常値として検出されていますね。

CleanShot 2024-10-21 at 19.50.25@2x

メール記録

メールも同様に異常値が通知されていますね!

CleanShot 2024-10-20 at 01.27.53@2x

以上で動作確認も含めて完了です!

おわりに

Amazon Managed Service for Apache Flinkでの異常分析(Random Cut Forest)のやり方はいかがでしたでしょうか。
AWSが提供しているサンプルに従って実装するとお手軽に実装できますね!

本記事が少しでも参考になったら幸いです。
最後までご覧いただきありがとうございました。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.